import pandas as pd
import numpy as np
import subprocess
import os
import plotly.express as px
import plotly.graph_objects as go
import math
WRITE_PATH = "/mnt/ttfs"
READ_PATH = "/mnt1/ttfs"
TOTAL_LOAD_PER_EPOCH = 1024*1024*256 #256MB
LST_WORKERS = [20,24,28,32,36,40,44,48,52,56,60,64]
LST_RECORDLEN = [256,512,768,1024]
LST_RECORDNUM = [4096, 8192, 12288]
class fsXT(object):
def __init__(self):
self.epochs = dict()
self.len_epochs = 0
self.pwd = os.getcwd()
self.testscriptfile = '%s/batch'%self.pwd
def generate_data(self):
es = set()
for arg_p in LST_WORKERS:
for arg_l in LST_RECORDLEN:
for arg_n in LST_RECORDNUM:
arg_f = max(1, int(TOTAL_LOAD_PER_EPOCH / (arg_p*arg_l*arg_n)))
arg_n = max(1, int(TOTAL_LOAD_PER_EPOCH/(arg_p*arg_f*arg_l)))
cmdline = "%s/ttfs -w%s -r%s -p%d -l%d -n%d -f%d" % \
(self.pwd, WRITE_PATH, READ_PATH, arg_p, arg_l, arg_n, arg_f)
es.add(cmdline)
sf = open(self.testscriptfile,'w')
arg_e = 1
for i in es:
sf.write((i+' -e%d\n'%arg_e))
arg_e += 1
sf.close()
self.len_epochs = len(es)
print('The test script file was generated and save in %s'%self.testscriptfile)
def run_test(self):
cmdline = 'chmod +x %s;nohup %s &'%(self.testscriptfile, self.testscriptfile)
print(cmdline)
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
process.communicate()
print('NOTICE: THIS IS A TIME CONSUMING JOB. YOU SHOULD WAIT UNTIL ALL EPOCHS FINISHED. PLEASE CHECK ITS STATUS ON HOST.')
def collect_data(self):
cmdline = \
"""
mkdir -p /tmp/fsXTlog;pushd /tmp/fsXTlog;rm files.log timeticks.log epochs.log -f;mv ../ttfslog.???? ./ -f;
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^1,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> files.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^2,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> timeticks.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^3,"|cut -f1 -d"," --complement >> epochs.log;done
"""%(self.len_epochs, self.len_epochs, self.len_epochs)
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
process.communicate()
print('Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log ')
def wrangle_data(self):
self.df_epochs = pd.read_csv("/tmp/fsXTlog/epochs.log", header=None)
self.df_epochs.columns = ['epoch', 'workers', 'record_length', 'record_number', 'files', 'start_timestamp', 'end_timestamp']
self.df_files = pd.read_csv("/tmp/fsXTlog/files.log", header=None)
self.df_files.columns = ['epoch', 'worker_id', 'filename','record_length', 'record_number', 'md5_write', 'md5_read',
'tlen_open','tlen_close','tlen_mv','tlen_write', 'tlen_read']
self.df_timeticks = pd.read_csv("/tmp/fsXTlog/timeticks.log", header=None)
self.df_timeticks.columns = ['epoch', 'worker_id', 'checkpoint','unit_wbytes', 'unit_rbytes', 'elapsed_time']
self.df_timeticks['unit_wbytes'] = self.df_timeticks['unit_wbytes']/1024**2
self.df_timeticks['unit_rbytes'] = self.df_timeticks['unit_rbytes']/1024**2
self.df_epochs['total_files'] = self.df_epochs['files']*self.df_epochs['workers']
self.df_epochs['total_records'] = self.df_epochs['total_files']*self.df_epochs['record_number']
self.df_epochs['file_size'] = self.df_epochs['record_length']*self.df_epochs['record_number']
self.df_epochs['duration'] = self.df_epochs['end_timestamp'] - self.df_epochs['start_timestamp']
self.df_epochs['total_load'] = self.df_epochs['total_files']*self.df_epochs['file_size']
self.df_epochs['e_speed'] = self.df_epochs['total_load']/self.df_epochs['duration']/1024**2
self.df_epochs.index = self.df_epochs['epoch']
self.df_epochs.index.name = ""
self.df_files['tlen_file'] = self.df_files['tlen_open']+self.df_files['tlen_close'] \
+self.df_files['tlen_mv']+self.df_files['tlen_write']+self.df_files['tlen_read']
self.df_files.drop(columns = ['record_length', 'record_number'], inplace=True)
self.df_files = pd.merge(self.df_files, self.df_epochs, how='left', left_on=['epoch'], right_on = ['epoch'])
self.df_files['fw_speed'] = self.df_files['file_size']/self.df_files['tlen_write']/1024**2
self.df_files['fr_speed'] = self.df_files['file_size']/self.df_files['tlen_read']/1024**2
df_score = xt.df_files[['e_speed','fw_speed','fr_speed']].agg(['min','max'])
df_score = df_score.append(pd.Series(df_score.loc['max'] - df_score.loc['min'], name='range'))
df_score_rst = self.df_files[['epoch','e_speed','fw_speed', 'fr_speed']].groupby(['epoch']).mean()
df_score_rst['epoch_score'] = (df_score_rst['e_speed'] - df_score.loc['min','e_speed'])/df_score.loc['range','e_speed']*100.0
df_score_rst['fw_score'] = (df_score_rst['fw_speed'] - df_score.loc['min','fw_speed'])/df_score.loc['range','fw_speed']*100.0
df_score_rst['fr_score'] = (df_score_rst['fr_speed'] - df_score.loc['min','fr_speed'])/df_score.loc['range','fr_speed']*100.0
df_score_rst['score'] = df_score_rst['epoch_score']*0.7+df_score_rst['fw_score']*0.2+df_score_rst['fr_score']*0.1
self.df_epochs = pd.merge(self.df_epochs, df_score_rst[['fw_speed', 'fr_speed','epoch_score','fw_score','fr_score','score']], \
how='inner', left_index=True, right_index=True)
self.df_epochs = self.df_epochs.sort_values('score', ascending=False).reset_index(drop=True)
self.df_epochs['rank'] = self.df_epochs.index + 1
print('Finished wrangling data.')
def eval_reliability(self):
df_nonreliability = self.df_files.query('md5_write!=md5_read')
if len(df_nonreliability) > 0:
print('!!!!!!! Fail to pass Reliablity Test !!!!!!!')
print('------The following files have different digests of writing and reading:-----')
print(self.df_files)
else:
print('### Success to pass Reliablity Test ###')
def analyze_correlationship(self):
tmpdf = self.df_epochs[['workers', 'record_length', 'record_number', 'files', \
'total_files', 'total_records', 'file_size', 'e_speed']]
self.__analyze_correlationship(tmpdf, 'e_speed', 'EPOCHS PERFORMACE')
tmpdf = self.df_files[['workers', 'record_length', 'record_number', 'total_records',\
'file_size', 'files','total_files', 'fw_speed', 'fr_speed']]
self.__analyze_correlationship(tmpdf, 'fw_speed', 'WORKER PERFORMANCE')
self.__analyze_correlationship(tmpdf, 'fr_speed', 'WORKER PERFORMANCE')
def __analyze_correlationship(self, df, basic_col, df_label):
sr_corr = df.corr()[basic_col]
sr_corr.dropna(inplace=True)
sr_corr.drop(labels=[basic_col], inplace=True)
order = sr_corr.abs().sort_values(ascending = False)
print('### Analyzing result of %s based on "%s" ###' % (df_label, basic_col) )
print(sr_corr[order.index])
#print('--- the most factor influencing "%s" is "%s"'%(basic_col,order.index[0]))
def histogram_wrspeed(self, li):
i = 1
for e in li:
tmpdf = self.df_timeticks.query('epoch==%d'%e)
fig = go.Figure()
trace_write = go.Histogram(histfunc="sum", y=tmpdf['unit_wbytes'], x=tmpdf['elapsed_time'],
name="Epoch%d_writing"%e, xbins=go.histogram.XBins(start=0, size=0.1))
trace_read = go.Histogram(histfunc="sum", y=tmpdf['unit_rbytes'], x=tmpdf['elapsed_time'],
name="Epoch%d_reading"%e, xbins=go.histogram.XBins(start=0, size=0.1))
fig.add_trace(trace_write)
fig.add_trace(trace_read)
es = xt.df_epochs.loc[e]
fig.update_layout(barmode="stack",bargap=0.1,
xaxis_title='Elapsed time (seconds)',
yaxis_title='MBytes')
print('Rank: %d\n'\
'Epoch: %d, Spent time: %.2f seconds,\n'\
'Record length: %d, Records: %d,\n'
'Workers: %d, Total files: %d'%
(i,e, es['duration'], es['record_length'],\
es['record_number'], es['workers'], es['total_files']))
i += 1
fig.show()
xt = fsXT()
xt.generate_data()
#xt.run_test()
xt.collect_data()
xt.wrangle_data()
xt.eval_reliability()
xt.analyze_correlationship()
# index order are identical to rank order, so no need sort.
xt.df_epochs[:10]\
[['rank', 'epoch','workers','record_length','record_number',\
'duration','total_files','e_speed', 'fw_speed', 'fr_speed',\
'epoch_score', 'fw_score', 'fr_score', 'score']]
xt.df_epochs[-10:]\
[['rank', 'epoch','workers','record_length','record_number',\
'duration','total_files','e_speed', 'fw_speed', 'fr_speed',\
'epoch_score', 'fw_score', 'fr_score', 'score']]
xt.histogram_wrspeed(xt.df_epochs.sort_values(['score'],ascending=False)[:3]['epoch'])
xt.histogram_wrspeed(xt.df_epochs.sort_values(['score'],ascending=False)[-3:]['epoch'])
xt.df_timeticks['bin_second'] = xt.df_timeticks['elapsed_time'].apply(math.ceil)
df_wrspeed=xt.df_timeticks[['epoch', 'bin_second', 'unit_wbytes', 'unit_rbytes']].groupby(['epoch', 'bin_second']).sum()
df_wrspeed['elapsed_seconds'] = df_wrspeed.index.get_level_values(1)
df_wrspeed['EPOCH'] = df_wrspeed.index.get_level_values(0)
df_wrspeed['rank'] = df_wrspeed.index.get_level_values(0)
kvs= {k:v for k, v in xt.df_epochs[['epoch', 'rank']].T.apply(lambda i: (i['epoch'],i['rank']))}
df_wrspeed['rank'] = df_wrspeed['rank'].map(kvs)
top3_bottom3 = list(xt.df_epochs[:3].index)\
+ list(xt.df_epochs[-3:].index)
fig = px.line(df_wrspeed.query('epoch in %s'%
list(xt.df_epochs['epoch'].loc[top3_bottom3])),
x='elapsed_seconds', y="unit_wbytes", color="rank",
line_group="rank", hover_name="EPOCH")
fig.show()
fig = px.box(xt.df_files, x="epoch", y="fw_speed")
fig.show()
fig = px.box(xt.df_files, x="epoch", y="fr_speed")
fig.show()
#df=xt.df_files
#df=df.set_index([df.epoch,df.index])
#df.std(level=0).sort_values(['fw_speed','fr_speed'])
Precondition: the same data load to writing